Giới thiệu: Đối với các doanh nghiệp, thách thức cốt lõi của việc thu thập dữ liệu chưa bao giờ chỉ đơn thuần là “synchronization”, mà là làm thế nào để đảm bảo độ chính xác, tính toàn vẹn và tính kịp thời của dữ liệu trong một môi trường quy mô lớn, không đồng nhất và phức tạp. bài viết này tìm hiểu về thực tiễn của SUPCON trong việc xây dựng một khuôn khổ thu thập dữ liệu cấp doanh nghiệp dựa trên Apache SeaTunnel, tập trung vào việc chia sẻ những hiểu biết cụ thể và các giải pháp trong các khía cạnh như cấu hình độ sẵn có cao của cụm, tối ưu hóa hiệu suất, cơ chế dung sai sót và giám sát chất lượng dữ liệu. Giới thiệu: Đối với các doanh nghiệp, thách thức cốt lõi của việc thu thập dữ liệu chưa bao giờ chỉ đơn thuần là “synchronization”, mà là làm thế nào để đảm bảo độ chính xác, tính toàn vẹn và tính kịp thời của dữ liệu trong một môi trường quy mô lớn, không đồng nhất và phức tạp. bài viết này tìm hiểu về thực tiễn của SUPCON trong việc xây dựng một khuôn khổ thu thập dữ liệu cấp doanh nghiệp dựa trên Apache SeaTunnel, tập trung vào việc chia sẻ những hiểu biết cụ thể và các giải pháp trong các khía cạnh như cấu hình độ sẵn có cao của cụm, tối ưu hóa hiệu suất, cơ chế dung sai sót và giám sát chất lượng dữ liệu. Dilemma: Siloed Collection Architecture và Chi phí vận hành và bảo trì cao Là một công ty nền tảng AI công nghiệp sâu sắc trao quyền cho ngành công nghiệp quy trình, kinh doanh toàn cầu của SUPCON đã liên tục phát triển. Hiện nay, nó có gần 40 công ty con toàn cầu và phục vụ hơn 35.000 khách hàng toàn cầu. Sự mở rộng liên tục của kinh doanh đã đặt ra các yêu cầu cao hơn cho công việc dữ liệu: dữ liệu không chỉ cần phải được "tính toán nhanh chóng" mà còn "được hạ cánh chính xác". Với mục đích này, chúng tôi đã xây dựng một nền tảng dữ liệu lớn phân chia dòng để đối phó với các tình huống phức tạp. Tuy nhiên, sự phức tạp của chính nền tảng đã ngược lại làm tăng sự khó khăn của việc thu thập dữ liệu, phát triển và vận hành & bảo trì, đặc biệt là trong liên kết nguồn của thu thập dữ liệu, nơi chúng tôi đang đối mặt với những thách : Trong quá khứ, chúng tôi từ lâu đã dựa vào các giải pháp bao gồm nhiều công cụ (chẳng hạn như sử dụng Sqoop để đồng bộ dữ liệu lô với HDFS, và Maxwell/StreamSets để xử lý nhật ký cơ sở dữ liệu gia tăng và viết chúng vào Kafka/Kudu). (1) Complex Architecture with Silos : Nhiều tuyến đường kỹ thuật có nghĩa là tăng gấp đôi áp lực giám sát hoạt động và bảo trì. Thiếu một cơ chế giám sát và cảnh báo thống nhất có nghĩa là bất kỳ sự bất thường nào (như sự chậm trễ đồng bộ, cạn kiệt tài nguyên) đòi hỏi rất nhiều nhân lực để khắc phục sự cố và "chống hỏa hoạn", khiến việc đảm bảo sự ổn định trở nên khó khăn. (2) O&M Black Hole, Constantly Firefighting Khi đối mặt với các nguồn dữ liệu mới (như cơ sở dữ liệu trong nước và SAP HANA), chúng ta cần tìm giải pháp thích ứng trong các công cụ khác nhau hoặc phát triển các plugin một cách độc lập, điều này làm cho không thể đáp ứng nhanh chóng nhu cầu kinh doanh. (3) Segmented Capabilities, Difficult to Expand Con số trên cho thấy rõ hệ sinh thái thu thập phân cấp trước đây.Chúng tôi nhận ra rằng mô hình "không có tổ chức" này đã trở thành liên kết dễ bị tổn thương nhất trong xử lý dữ liệu.Không chỉ không phù hợp với tốc độ phát triển trong tương lai của công ty mà còn đặt ra mối đe dọa tiềm tàng đối với chất lượng và tính kịp thời của dữ liệu.Tạo ra một khuôn khổ thu thập dữ liệu thống nhất, ổn định và hiệu quả đã trở nên quan trọng và khẩn cấp. Breaking the Dilemma: Thoughts on a Unified Collection Framework and Technology Selection (Hãy phá vỡ tình huống khó khăn: Suy nghĩ về một khuôn khổ bộ sưu tập thống nhất và lựa chọn công nghệ) Sau khi phân tích và suy nghĩ kỹ lưỡng, chúng tôi đã làm rõ 5 tiêu chí lựa chọn cốt lõi cho các công nghệ mới: : Nó nên bao gồm đầy đủ tất cả các loại nguồn dữ liệu hiện tại và tương lai của công ty (từ MySQL, Oracle, HANA đến Kafka, StarRocks, v.v.) và hỗ trợ cả hai chế độ thu thập ngoại tuyến và thời gian thực, cơ bản giải quyết vấn đề của tập hợp công nghệ thống nhất. (1) Comprehensive Connectivity : Bản thân khung phải là một cụm phân tán có sẵn cao với khả năng dung nạp lỗi mạnh mẽ. Ngay cả khi một nút duy nhất thất bại, toàn bộ dịch vụ không nên bị gián đoạn và có thể phục hồi tự động, đảm bảo hoạt động liên tục của đường ống thu thập dữ liệu. (2) Cluster Stability and High Availability Ở cấp độ thực hiện nhiệm vụ, nó phải cung cấp chính xác-một lần hoặc ít nhất-một lần xử lý ngữ nghĩa để đảm bảo rằng các nhiệm vụ có thể tự động phục hồi từ điểm phá vỡ sau khi gián đoạn bất thường, loại bỏ trùng lặp hoặc mất dữ liệu, đó là nền tảng của chất lượng dữ liệu. (3) Reliable Data Consistency Guarantee : Nó phải có thể dễ dàng đối phó với những thách thức tăng cường dữ liệu hàng ngày của chúng tôi ở cấp TB. kiến trúc của nó nên hỗ trợ mở rộng ngang, và hiệu suất đồng bộ có thể được cải thiện tuyến tính bằng cách thêm các nút để đáp ứng nhu cầu tăng trưởng dữ liệu do sự phát triển nhanh chóng của doanh nghiệp. (4) Strong Throughput Performance Nó phải cung cấp một cơ chế giám sát và cảnh báo hoàn chỉnh, có thể theo dõi các chỉ số chính như bất thường, chậm trễ và thông lượng trong quá trình đồng bộ hóa dữ liệu trong thời gian thực, và thông báo cho nhân viên vận hành và bảo trì một cách kịp thời, chuyển đổi "đánh lửa" thụ động thành "cảnh báo sớm" tích cực. (5) Observable O&M Experience Dựa trên năm tiêu chí này, chúng tôi đã tiến hành nghiên cứu chuyên sâu và thử nghiệm so sánh về các giải pháp chính trong ngành. Cuối cùng, Apache SeaTunnel đã thực hiện tốt ở tất cả các kích thước và trở thành giải pháp tối ưu của chúng tôi để phá vỡ vấn đề. Our Core Requirements Apache SeaTunnel's Solutions Comprehensive Connectivity It has an extremely rich Connector ecosystem, officially supporting the reading and writing of hundreds of source/destination databases, fully covering all our data types. A single framework can unify offline and real-time collection. Cluster Stability and High Availability The separated architecture of SeaTunnel Engine ensures that even if a single Master or Worker node is abnormal, it will not affect the continuity of collection tasks. Reliable Data Consistency Guarantee It provides a powerful fault tolerance mechanism, supports Exactly-Once semantics, and can realize automatic breakpoint resumption after task abnormalities through the Checkpoint mechanism, ensuring no data loss or duplication. Strong Throughput Performance It has excellent distributed data processing capabilities. Parallelism can be adjusted through simple configuration, easily realizing horizontal expansion. Observable O&M Experience It provides rich monitoring indicators and can be seamlessly integrated with mainstream monitoring and alerting systems such as Prometheus, Grafana, and AlertManager, allowing us to have a clear understanding of the data collection process. Kết nối toàn diện Nó có một hệ sinh thái Connector cực kỳ phong phú, chính thức hỗ trợ việc đọc và viết hàng trăm cơ sở dữ liệu nguồn / đích, bao gồm đầy đủ tất cả các loại dữ liệu của chúng tôi. Cluster Stability và High Availability Kiến trúc riêng biệt của SeaTunnel Engine đảm bảo rằng ngay cả khi một nút Master hoặc Worker duy nhất là bất thường, nó sẽ không ảnh hưởng đến tính liên tục của các nhiệm vụ thu thập. Đảm bảo dữ liệu đáng tin cậy Nó cung cấp một cơ chế dung nạp lỗi mạnh mẽ, hỗ trợ ngữ nghĩa Exactly-Once, và có thể thực hiện việc khôi phục điểm phá vỡ tự động sau khi bất thường nhiệm vụ thông qua cơ chế Checkpoint, đảm bảo không mất dữ liệu hoặc trùng lặp. Hiệu suất Throughput Nó có khả năng xử lý dữ liệu phân tán tuyệt vời. song song có thể được điều chỉnh thông qua cấu hình đơn giản, dễ dàng thực hiện mở rộng ngang. Kinh nghiệm O&M Nó cung cấp các chỉ số giám sát phong phú và có thể được tích hợp liền mạch với các hệ thống giám sát và cảnh báo chính như Prometheus, Grafana và AlertManager, cho phép chúng tôi có một sự hiểu biết rõ ràng về quá trình thu thập dữ liệu. Thực tiễn: Kế hoạch thực hiện cụ thể và chi tiết Trong giai đoạn đầu, chúng tôi đã xây dựng dựa trên Apache SeaTunnel v2.3.5. Tại thời điểm đó, để đáp ứng một số nhu cầu cụ thể (chẳng hạn như xử lý các vấn đề nhạy cảm trường hợp của các tên bảng cơ sở dữ liệu khác nhau hoặc tên trường), chúng tôi đã thực hiện một số công việc phát triển thứ cấp. Tuy nhiên, với sự phát triển nhanh chóng của cộng đồng SeaTunnel, các chức năng và bộ chuyển đổi của phiên bản mới đã trở nên ngày càng đầy đủ.Khi chúng tôi nâng cấp thành công các cụm lên Apache SeaTunnel v2.3.11, chúng tôi rất ngạc nhiên khi thấy rằng các nhu cầu mà trước đây đòi hỏi sự phát triển tùy chỉnh bây giờ được hỗ trợ tự nhiên trong phiên bản mới. Hiện tại, tất cả các nhiệm vụ đồng bộ hóa dữ liệu của chúng tôi được thực hiện dựa trên phiên bản chính thức, đạt được không sửa đổi, điều này làm giảm đáng kể chi phí bảo trì lâu dài của chúng tôi và cho phép chúng tôi tận hưởng liền mạch các tính năng mới nhất và cải tiến hiệu suất do cộng đồng mang lại. Sau đây là các kế hoạch triển khai cốt lõi của chúng tôi dựa trên phiên bản v2.3.11, đã được xác minh bởi khối lượng dữ liệu cấp TB trong môi trường sản xuất và đặt nền tảng vững chắc cho hiệu suất tuyệt vời của 0 thất bại kể từ khi cụm được xây dựng. 1) Lập kế hoạch cluster Để đảm bảo tính sẵn có cao của cụm, chúng tôi khuyên bạn nên ưu tiên triển khai cụm chế độ riêng biệt. Node CPU Memory Disk JVM Heap Master-01 8C 32G 200G 30G Master-02 8C 32G 200G 30G Worker-01 16C 64G 500G 62G Worker-02 16C 64G 500G 62G Worker-03 16C 64G 500G 62G Thạc sĩ-01 8c 32g 200g 30g Trưởng 02 8c 32g 200g 30g Nhân Viên 01 16C 64g 500g 62g Nhân Viên 02 16C 64g 500g 62g Công nhân-03 16C 64g 500g 62g (2) Key Cluster Configuration File (tệp cấu hình cluster chính) This configuration file is mainly used to define the execution behavior, fault tolerance mechanism, and operation and maintenance monitoring settings of jobs. It optimizes performance by enabling class loading caching and dynamic resource allocation, and ensures job fault tolerance and data consistency by configuring S3-based Checkpoints. In addition, it can enable indicator collection, log management, and settings, thereby providing comprehensive support for the stable operation, monitoring, and daily management of jobs. seatunnel.yaml seatunnel: engine: # Class loader cache mode: After enabling, it can significantly improve performance when jobs are frequently started and stopped, reducing class loading overhead. It is recommended to enable it in the production environment. classloader-cache-mode: true # Expiration time of historical job data (unit: minutes): 3 days. Historical information of completed jobs exceeding this time will be automatically cleaned up. history-job-expire-minutes: 4320 # Number of data backups backup-count: 1 # Queue type: Blocking queue queue-type: blockingqueue # Execution information printing interval (seconds): Print job execution information in the log every 60 seconds. print-execution-info-interval: 60 # Job metric information printing interval (seconds): Print detailed metric information in the log every 60 seconds. print-job-metrics-info-interval: 60 slot-service: # Dynamic Slot management: After enabling, the engine will dynamically allocate computing slots based on node resource conditions, improving resource utilization. dynamic-slot: true # Checkpoint configuration. checkpoint: interval: 60000 # Time interval between two Checkpoints, in milliseconds (ms). Here it is 1 minute. timeout: 600000 # Timeout for Checkpoint execution, in milliseconds (ms). Here it is 10 minutes. storage: type: hdfs # The storage type is declared as HDFS here, and the actual storage is in the S3 below. max-retained: 3 # Maximum number of Checkpoint histories to retain. Old Checkpoints will be automatically deleted to save space. plugin-config: storage.type: s3 # The actual configured storage type is S3 (or object storage compatible with S3 protocol such as MinIO) fs.s3a.access.key: xxxxxxx # Access Key of S3-compatible storage fs.s3a.secret.key: xxxxxxx # Secret Key of S3-compatible storage fs.s3a.endpoint: http://xxxxxxxx:8060 # Service endpoint (Endpoint) address of S3-compatible storage s3.bucket: s3a://seatunel-pro-bucket # Name of the bucket used to store Checkpoint data fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider # Authentication credential provider # Observability configuration telemetry: metric: enabled: true # Enable metric collection logs: # Enable scheduled log deletion: Enable the automatic cleaning function of log files to prevent logs from filling up the disk. scheduled-deletion-enable: true # Web UI and REST API configuration http: enable-http: true # Enable Web UI and HTTP REST API services port: 8080 # Port number bound by the Web service enable-dynamic-port: false # Disable dynamic ports. Whether to enable other ports if 8080 is occupied. # The following is the Web UI basic authentication configuration enable-basic-auth: true # Enable basic identity authentication basic-auth-username: admin # Login username basic-auth-password: xxxxxxx # Login password This JVM parameter configuration file is mainly used to ensure the stability and performance of the SeaTunnel engine during large-scale data processing. It provides basic memory guarantee by setting the heap memory and metaspace capacity, and conducts a series of optimizations specifically for the G1 garbage collector to effectively manage memory garbage, control garbage collection pause time, and improve operating efficiency. jvm_master_options # JVM heap memory -Xms30g -Xmx30g # Memory overflow diagnosis: Automatically generate a Heap Dump file when OOM occurs, and save it to the specified path for subsequent analysis. -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server # Metaspace: Limit the maximum capacity to 5GB to prevent metadata from expanding infinitely and occupying too much local memory. -XX:MaxMetaspaceSize=5g # G1 garbage collector related configuration -XX:+UseG1GC # Enable G1 garbage collector -XX:+PrintGCDetails # Print detailed GC information in the log -Xloggc:/path/to/gc.log # Output GC logs to the specified file -XX:+PrintGCDateStamps # Print timestamps in GC logs -XX:MaxGCPauseMillis=5000 # The target maximum GC pause time is 5000 milliseconds (5 seconds) -XX:InitiatingHeapOccupancyPercent=50 # Start concurrent GC cycle when heap memory usage reaches 50% -XX:+UseStringDeduplication # Enable string deduplication to save memory space -XX:GCTimeRatio=4 # Set the target ratio of GC time to application time -XX:G1ReservePercent=15 # Reserve 15% of heap memory -XX:ConcGCThreads=6 # Set the number of threads used in the concurrent GC phase to 6 -XX:G1HeapRegionSize=32m # Set the G1 region size to 32MB This configuration file defines the underlying distributed architecture and collaboration mechanism of the SeaTunnel engine cluster. It is mainly used to establish and manage network communication between cluster nodes. The configuration also includes a high-precision failure detection heartbeat mechanism to ensure that node failure problems can be quickly detected and handled, ensuring the high availability of the cluster. At the same time, it enables distributed data persistence based on S3-compatible storage, reliably saving key state information to object storage. hazelcast-master.yaml (iMap stored in self-built object storage) hazelcast: cluster-name: seatunnel # Cluster name, which must be consistent across all nodes network: rest-api: enabled: true # Enable REST API endpoint-groups: CLUSTER_WRITE: enabled: true DATA: enabled: true join: tcp-ip: enabled: true # Use TCP/IP discovery mechanism member-list: # Cluster node list - 10.xx.xx.xxx:5801 - 10.xx.xx.xxx:5801 - 10.xx.xx.xxx:5802 - 10.xx.xx.xxx:5802 - 10.xx.xx.xxx:5802 port: auto-increment: false # Disable port auto-increment port: 5801 # Fixed port 5801 properties: hazelcast.invocation.max.retry.count: 20 # Maximum number of invocation retries hazelcast.tcp.join.port.try.count: 30 # Number of TCP connection port attempts hazelcast.logging.type: log4j2 # Use log4j2 logging framework hazelcast.operation.generic.thread.count: 50 # Number of generic operation threads hazelcast.heartbeat.failuredetector.type: phi-accrual # Use Phi-accrual failure detector hazelcast.heartbeat.interval.seconds: 2 # Heartbeat interval (seconds) hazelcast.max.no.heartbeat.seconds: 180 # No heartbeat timeout (seconds) hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 # Failure detection threshold hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 # Detection sample size hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 # Minimum standard deviation (milliseconds) hazelcast.operation.call.timeout.millis: 150000 # Operation call timeout (milliseconds) map: engine*: map-store: enabled: true # Enable Map storage persistence initial-mode: EAGER # Load all data immediately at startup factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory # Persistence factory class properties: type: hdfs # Storage type namespace: /seatunnel/imap # Namespace path clusterName: seatunnel-cluster # Cluster name storage.type: s3 # Actually use S3-compatible storage fs.s3a.access.key: xxxxxxxxxxxxxxxx # S3 access key fs.s3a.secret.key: xxxxxxxxxxxxxxxx # S3 secret key fs.s3a.endpoint: http://xxxxxxx:8060 # S3 endpoint address s3.bucket: s3a://seatunel-pro-bucket # S3 storage bucket name fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider # Authentication provider (3) Thu thập ví dụ nhiệm vụ ① MySQL-CDC to StarRocks Để thu thập dữ liệu MySQL-CDC, cần phải đảm bảo rằng cơ sở dữ liệu nguồn đã kích hoạt Binlog với định dạng ROW, người dùng có quyền thích hợp, và gói MySQL Jar tương ứng được đặt trong Đối với các chi tiết, xin vui lòng tham khảo trang web chính thức: . ${SEATUNNEL_HOME}/lib https://seatunnel.apache.org/docs/2.3.11/connector-v2/source/MySQL-CDC Sau đây là một cấu hình mẫu cho bộ sưu tập MySQL-CDC của chúng tôi. env { parallelism = 1 # Parallelism is set to 1; only 1 is allowed for streaming collection job.mode = "STREAMING" # Streaming job mode job.name = cdh2sr # Job name identifier job.retry.times = 3 # Number of retries if the job fails job.retry.interval.seconds=180 # Retry interval (in seconds) } source { MySQL-CDC { base-url = "jdbc:mysql://xxxxxxx:3306/databasename" # MySQL connection address username = "xxxxxxr" # Database username password = "xxxxxx" # Database password table-names = ["databasename.table1","databasename_pro.table2"] # List of tables to sync (format: database.table name) startup.mode = "latest" # Start syncing from the latest position exactly_once = true # Enable Exactly-Once semantics debezium { include.schema.changes = "false" # Exclude schema changes snapshot.mode = when_needed # Take snapshots on demand } } } transform { TableRename { plugin_input = "cdc" # Input plugin identifier plugin_output = "rs" # Output plugin identifier convert_case = "LOWER" # Convert table names to lowercase prefix = "ods_cdh_databasename_" # Add prefix to table names } } sink { StarRocks { plugin_input = "rs" # Input plugin identifier (consistent with transform output) nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE node addresses base-url = "jdbc:mysql://xxxxxxx:3307" # StarRocks MySQL protocol address username = "xxxx" # StarRocks username password ="xxxxxxx" # StarRocks password database = "ods" # Target database enable_upsert_delete = true # Enable update/delete functionality max_retries = 3 # Number of retries if write fails http_socket_timeout_ms = 360000 # HTTP timeout (in milliseconds) retry_backoff_multiplier_ms = 2000 # Retry backoff multiplier max_retry_backoff_ms = 20000 # Maximum retry backoff time batch_max_rows = 2048 # Maximum number of rows per batch batch_max_bytes = 50000000 # Maximum bytes per batch } } ② Oracle-CDC to StarRocks Để thu thập dữ liệu Oracle-CDC, đảm bảo cơ sở dữ liệu nguồn có Logminer được bật, người dùng có quyền thích hợp, và đặt các gói OJDBC.Jar và Orai18n.jar tương ứng trong Địa chỉ. Để biết chi tiết, hãy tham khảo trang web chính thức: . ${SEATUNNEL_HOME}/lib https://seatunnel.apache.org/docs/2.3.11/connector-v2/source/Oracle-CDC Đặc biệt, liên quan đến các vấn đề về độ trễ gặp phải trong quá trình thu thập Oracle-CDC, chúng tôi khuyên bạn nên yêu cầu DBA kiểm tra tần suất chuyển đổi các nhật ký Logminer. Khuyến nghị chính thức là giữ nó khoảng 10 lần mỗi giờ – chuyển đổi quá thường xuyên có thể gây ra độ trễ kéo dài. Nếu tần số quá cao, hãy tăng kích thước của các tệp nhật ký riêng lẻ. -- Query log switch frequency SELECT GROUP#, THREAD#, BYTES/1024/1024 || 'MB' "SIZE", ARCHIVED, STATUS FROM V$LOG; SELECT TO_CHAR(first_time, 'YYYY-MM-DD HH24') AS hour, COUNT(*) AS switch_count FROM v$log_history WHERE first_time >= TRUNC(SYSDATE) - 1 -- Data from the past day GROUP BY TO_CHAR(first_time, 'YYYY-MM-DD HH24') ORDER BY hour; -- Query log file size SELECT F.MEMBER, L.GROUP#, L.THREAD#, L.SEQUENCE#, L.BYTES/1024/1024 AS SIZE_MB, L.ARCHIVED, L.STATUS, L.FIRST_CHANGE#, L.NEXT_CHANGE# FROM V$LOG L, V$LOGFILE F WHERE F.GROUP# = L.GROUP# ORDER BY L.GROUP#; Sau đây là một cấu hình mẫu cho bộ sưu tập Oracle-CDC của chúng tôi. env { parallelism = 1 # Parallelism is 1; only 1 is allowed for streaming collection job.mode = "STREAMING" # Streaming job mode job.name = bpm2sr # Job name identifier job.retry.times = 3 # Number of retries if the job fails job.retry.interval.seconds=180 # Retry interval (in seconds) } source { Oracle-CDC { plugin_output = "cdc" # Output plugin identifier base-url = "jdbc:oracle:thin:@xxxxxx:1521:DB" # Oracle connection address username = "xxxxxx" # Database username password = "xxxxxx" # Database password table-names = ["DB.SC.TABLE1","DB.SC.TABLE2"] # Tables to sync (format: database.schema.table name) startup.mode = "latest" # Start syncing from the latest position database-names = ["DB"] # Database name schema-names = ["SC"] # Schema name skip_analyze = true # Skip table analysis use_select_count = true # Use statistics exactly_once = true # Enable Exactly-Once semantics connection.pool.size = 20 # Connection pool size debezium { log.mining.strategy = "online_catalog" # Log mining strategy log.mining.continuous.mine = true # Continuously mine logs lob.enabled = false # Disable LOB support internal.log.mining.dml.parser ="legacy" # Use legacy DML parser } } } transform { TableRename { plugin_input = "cdc" # Input plugin identifier plugin_output = "rs" # Output plugin identifier convert_case = "LOWER" # Convert table names to lowercase prefix = "ods_crm_db_" # Add prefix to table names } } sink { StarRocks { plugin_input = "rs" # Input plugin identifier nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE nodes base-url = "jdbc:mysql://xxxxxxx:3307" # JDBC connection address username = "xxxx" # Username password ="xxxxxxx" # Password database = "ods" # Target database enable_upsert_delete = true # Enable update/delete max_retries = 3 # Maximum number of retries http_socket_timeout_ms = 360000 # HTTP timeout retry_backoff_multiplier_ms = 2000 # Retry backoff multiplier max_retry_backoff_ms = 20000 # Maximum retry backoff time batch_max_rows = 2048 # Maximum rows per batch batch_max_bytes = 50000000 # Maximum bytes per batch } } (4) Giám sát quan sát Nhờ các số liệu giám sát mạnh mẽ được cung cấp bởi phiên bản mới của SeaTunnel và hệ thống giám sát toàn diện mà chúng tôi đã xây dựng, chúng tôi có thể nắm bắt đầy đủ tình trạng của nền tảng thu thập dữ liệu từ cả quan điểm toàn nhóm và cấp nhiệm vụ. hệ thống giám sát của chúng tôi chủ yếu bao gồm hai chiều hướng sau: ① Cluster Monitoring Tình trạng nút: Theo dõi theo thời gian thực về số lượng nút cluster và trạng thái tồn tại của chúng để đảm bảo không có nút Worker bất thường và đảm bảo khả năng xử lý cluster. Cluster throughput: Theo dõi tổng số SourceReceivedQPS và SinkWriteQPS của cụm để nắm bắt tỷ lệ lưu lượng và lưu lượng dữ liệu toàn cầu và đánh giá tải trọng cụm. Tình trạng tài nguyên: Giám sát CPU và bộ nhớ của các nút cluster để cung cấp cơ sở cho việc mở rộng hoặc tối ưu hóa tài nguyên. Sức khỏe mạng: Đảm bảo điều kiện mạng cluster tốt bằng cách theo dõi nhịp tim nội bộ và độ trễ giao tiếp. ② Task Monitoring Tình trạng hoạt động nhiệm vụ: Kiểm tra thời gian thực trạng trạng thái đang chạy (Run/Failed/Finished) của tất cả các nhiệm vụ là yêu cầu cơ bản nhất của giám sát. Âm lượng đồng bộ hóa dữ liệu: Theo dõi SourceReceivedCount và SinkWriteCount của mỗi nhiệm vụ để nắm bắt thông lượng của từng đường ống dữ liệu trong thời gian thực. Thời gian chậm trễ: Đây là một trong những chỉ số quan trọng nhất cho các nhiệm vụ CDC. Các cảnh báo được gửi khi sự chậm trễ liên tục xảy ra ở cuối bộ sưu tập. Kết quả: Lợi ích có thể đo lường Sau một thời gian hoạt động ổn định, khuôn khổ thu thập dữ liệu thế hệ mới được xây dựng dựa trên Apache SeaTunnel đã mang lại cho chúng tôi những lợi ích đáng kể và có thể đo lường được, chủ yếu được phản ánh trong các khía cạnh sau: (1) Sự ổn định: Từ "Hỏa hoạn liên tục" đến "Hòa bình tâm trí" : Under the old solution, 1-3 synchronization abnormalities needed to be handled per month. Since the new cluster was launched, core data synchronization tasks have maintained 0 failures, with no data service interruptions caused by the framework itself. Task failure rate reduced by over 99% : Relying on Apache SeaTunnel's Exactly-Once semantics and powerful Checkpoint mechanism, end-to-end Exactly-Once processing is achieved, completely solving the problem of potential trace data duplication or loss and fundamentally ensuring data quality. 100% data consistency : The high-availability design of the cluster ensures 99.99% service availability. Any single-point failure can be automatically recovered within minutes, with no impact on business operations. Significantly improved availability (2) Hiệu quả: Phát triển gấp đôi và hiệu quả O&M : From writing and maintaining multiple sets of scripts in the past to unified configuration-based development. The time to connect new data sources has been reduced from 1-2 person-days to within 1 minute, showing a significant efficiency improvement. 50% improvement in development efficiency : Now, the overall status can be monitored through the Grafana dashboard, with daily active O&M investment of less than 0.5 person-hours. 70% reduction in O&M costs : End-to-end data latency has been optimized from minutes to seconds, providing a solid foundation for real-time data analysis and decision-making. Optimized data timeliness (3) Kiến trúc: Tối ưu hóa tài nguyên và Khung thống nhất : Successfully integrated multiple technology stacks such as Sqoop and StreamSets into Apache SeaTunnel, greatly reducing technical complexity and long-term maintenance costs. Unified technology stack Chương 5: Kế hoạch tương lai : We will actively explore the native deployment and scheduling capabilities of Apache SeaTunnel on Kubernetes, leveraging its elastic scaling features to achieve on-demand allocation of computing resources, further optimizing costs and efficiency, and better embracing hybrid cloud and multi-cloud strategies. (1) Full cloud native adoption : Build AIOps capabilities based on the rich Metrics data collected, realizing intelligent prediction of task performance, automatic root cause analysis of faults, and intelligent parameter tuning. (2) Intelligent O&M 6 - Sự công nhận Ở đây, chúng tôi chân thành cảm ơn cộng đồng mã nguồn mở Apache SeaTunnel. Đồng thời, chúng tôi cũng cảm ơn tất cả các thành viên trong nhóm dự án nội bộ của công ty – công việc chăm chỉ và can đảm khám phá của bạn là chìa khóa để thực hiện thành công nâng cấp kiến trúc này. SUPCON phá vỡ các công cụ dữ liệu silos cho Apache SeaTunnel—bây giờ các tác vụ đồng bộ cốt lõi chạy 0-failure! 99% thất bại thấp hơn, sự nhất quán 100%, 70% chi phí O&M ít hơn.